Skip to main content

Kafka SelectDB Connector

Kafka Connect is a scalable and reliable tool for data transmission between Apache Kafka and other systems. Connectors can be defined to move large amounts of data in and out of Kafka.

SelectDB provides the Sink Connector plug-in, which can write JSON data in Kafka topic to SelectDB.

Version support

KafkaJavaRuntime Jar
2.4.x8kafka-connect-selectdb-1.0.0

Kafka Connect Usage

Standalone mode

Configure connect-standalone.properties

# broker address
bootstrap.servers=127.0.0.1:9092

Configure connect-selectdb-sink.properties

name=test-selectdb-sink
connector.class=com.selectdb.kafka.connector.SelectdbSinkConnector
topics=topic
selectdb.topic2table.map=topic:test_kafka_tbl
buffer.count.records=10000
buffer.flush.time=60
buffer.size.bytes=5000000
selectdb.url=xxx.cn-beijing.privatelink.aliyuncs.com
selectdb.http.port=48614
selectdb.query.port=25865
selectdb.user=admin
selectdb.password=
selectdb.database=test_db
selectdb.cluster=cluster_name
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

Start

$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/connect-selectdb-sink.properties

Distributed mode

Configure connect-distributed.properties

# broker address
bootstrap.servers=127.0.0.1:9092

# Modify group.id, the same cluster needs to be consistent
group.id=connect-cluster

Start

$KAFKA_HOME/bin/connect-distributed.sh $KAFKA_HOME/config/connect-distributed.properties

Add connector

curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{
"name":"test-selectdb-sink-cluster",
"config":{
"connector.class":"com.selectdb.kafka.connector.SelectdbSinkConnector",
"topics":"topic",
"selectdb.topic2table.map": "topic:test_kafka_tbl",
"buffer.count.records":"10000",
"buffer.flush.time":"60",
"buffer.size.bytes":"5000000",
"selectdb.url":"xx.cn-beijing.privatelink.aliyuncs.com",
"selectdb.user":"admin",
"selectdb.password":"",
"selectdb.http.port":"48614",
"selectdb.query.port":"25865",
"selectdb.database":"test_db",
"selectdb.cluster":"cluster_name",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":"false",
"value.converter.schemas.enable":"false",
}
}'

Access an SSL-certified Kafka cluster

Accessing the SSL-certified Kafka cluster through kafka-connect requires the user to provide the certificate file (client.truststore.jks) used to authenticate the Kafka Broker public key. You can add the following configuration to the connect-distributed.propertiesfile :

# Connect worker
security.protocol=SSL
ssl.truststore.location=/var/ssl/private/client.truststore.jks
ssl.truststore.password=test1234

# Embedded consumer for sink connectors
consumer.security.protocol=SSL
consumer.ssl.truststore.location=/var/ssl/private/client.truststore.jks
consumer.ssl.truststore.password=test1234

For instructions on configuring Kafka clusters with SSL authentication through kafka-connect, please refer to: Configure Kafka Connect

Configuration items

KeyDefault ValueRequiredDescription
name-YConnect application name, must be unique in the Kafka Connect environment
connector.class-Ycom.selectdb.kafka.connector.SelectdbSinkConnector
topics-YA list of topics to subscribe to, separated by commas: topic1,topic2
selectdb.url-YSelectDB connection address
selectdb.http.port-YSelectDB HTTP protocol port
selectdb.query.port-YSelectDB MySQL protocol port
selectdb.user-YSelectDB username
selectdb.password-YSelectDB password
selectdb.database-Ythe database to write to
selectdb.cluster-YWrite the cluster name to use
selectdb.topic2table.map-NMapping between topic and table tables, for example: topic1:tb1, topic2:tb2 is empty by default, indicating that topic and table names correspond one-to-one
buffer.count.records10000NThe number of records buffered in memory per Kafka partition before flushing to SelectDB. Default 10000 records
buffer.flush.time120NBuffer refresh interval, in seconds, default 120 seconds
buffer.size.bytes5000000(5MB)NCumulative size of records buffered in memory for each Kafka partition, in bytes, default 5MB
jmxtrueNJMX is enabled by default to obtain connector internal monitoring indicators

For other general configuration items of Kafka Connect Sink, please refer to: Kafka Connect Sink Configuration Properties